{
  "metadata" : {
    "id" : "75765162-2d2f-4070-b5b4-9818cac60853",
    "name" : "Structured-Streaming-in-action",
    "user_save_timestamp" : "1970-01-01T01:00:00.000Z",
    "auto_save_timestamp" : "1970-01-01T01:00:00.000Z",
    "language_info" : {
      "name" : "scala",
      "file_extension" : "scala",
      "codemirror_mode" : "text/x-scala"
    },
    "trusted" : true,
    "sparkNotebook" : null,
    "customLocalRepo" : null,
    "customRepos" : null,
    "customDeps" : [ "org.apache.spark %% spark-sql % 2.3.0", "org.apache.spark %% spark-sql-kafka-0-10 % 2.3.0" ],
    "customImports" : null,
    "customArgs" : null,
    "customSparkConf" : null,
    "customVars" : null
  },
  "cells" : [ {
    "metadata" : {
      "id" : "6E2995E02B244E978E1327B7A60484F0"
    },
    "cell_type" : "markdown",
    "source" : "# Structured Streaming - Kafka Example\n\nThe intention of this example is to explore the main aspects of the Structured Streaming API.\n\n - We use the Kafka `source` to consume the `iot-data` topic.\n - We use a file `sink` to store the data into a _Parquet_ file.\n \nTo run this example, you also require:\n\n- a running  Kafka broker. We suggest to use the easy-to-run _dockerized_ version maintained by Spotify: https://hub.docker.com/r/spotify/kafka/\n- a reference file, listing parameters for each sensor. This file must be generated with [reference-data-generator](./reference-data-generator.snb.ipynb)\n- our data producer notebook: [kafka_sensor_data_producer](./kafka_sensor_data_producer.snb.ipynb)\n\nBecause Kafka acts as a broker between producer and consumer, you can choose to run the two notebooks in any order. \nNevertheless, we suggest that you run the producer first to have data available when we go through this example."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "447E7A8553BF47FD8416FC260B1A582C"
    },
    "cell_type" : "code",
    "source" : [ "import org.apache.spark.sql.kafka010._" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "3B783C2DA5A9409E85DF2CE7F061AECB"
    },
    "cell_type" : "code",
    "source" : [ "import java.io.File\n", "// Kafka\n", "val kafkaBootstrapServer = \"127.0.0.1:9092\"\n", "val topic = \"iot-data\"\n", "\n", "// File system\n", "val workDir = \"/tmp/streaming-with-spark\"\n", "val referenceFile = \"sensor-records.parquet\"\n", "val targetFile = \"structured_enrichedIoTStream.parquet\"\n", "val targetPath = new File(workDir, targetFile).getAbsolutePath\n", "val unknownSensorsTargetFile = \"unknownSensorsStream.parquet\"\n", "val unknownSensorsTargetPath = new File(workDir, unknownSensorsTargetFile).getAbsolutePath\n" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "3225F1A3939642B087F1BE6A37EB9D03"
    },
    "cell_type" : "code",
    "source" : [ "val rawData = sparkSession.readStream\n", "      .format(\"kafka\")\n", "      .option(\"kafka.bootstrap.servers\", kafkaBootstrapServer)\n", "      .option(\"subscribe\", topic)\n", "      .option(\"startingOffsets\", \"earliest\")\n", "      .load()" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "497CEEFAB7DF40F884CC7A8139C3DA5F"
    },
    "cell_type" : "code",
    "source" : [ "rawData.isStreaming" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "A74BF086DCC240168F21E57797088678"
    },
    "cell_type" : "code",
    "source" : [ "rawData.printSchema()" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "A39D7FB1A9AC496F8DFC7502EB0A4C29"
    },
    "cell_type" : "code",
    "source" : [ "case class SensorData(sensorId: Int, timestamp: Long, value: Double)" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "6E729EE3495E4AA88FC9E347BDEE3210"
    },
    "cell_type" : "code",
    "source" : [ "val iotData = rawData.select($\"value\").as[String].flatMap{record =>\n", "  val fields = record.split(\",\")\n", "  Try {\n", "    SensorData(fields(0).toInt, fields(1).toLong, fields(2).toDouble)\n", "  }.toOption\n", "}" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "CD174F6E3EF1475AA51899B5931B7415"
    },
    "cell_type" : "markdown",
    "source" : "## Load the reference data from a parquet file¶\nWe also cache the data to keep it in memory and improve the performance of our steaming application"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "0314D4CF7E88448EB2A5BDBED5839282"
    },
    "cell_type" : "code",
    "source" : [ "val sensorRef = sparkSession.read.parquet(s\"$workDir/$referenceFile\")\n", "sensorRef.cache()" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "B3CA2A3BCDD14C4F8F8A51EF6345D8F2"
    },
    "cell_type" : "markdown",
    "source" : "## Join the Reference Data with the Stream to Compute the Enriched Values"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "3A7A1D84B28542CB81342CC1AA13B0E9"
    },
    "cell_type" : "code",
    "source" : [ "val sensorWithInfo = sensorRef.join(iotData, Seq(\"sensorId\"), \"inner\")\n", "\n", "val knownSensors = sensorWithInfo\n", "  .withColumn(\"dnvalue\", $\"value\"*($\"maxRange\"-$\"minRange\")+$\"minRange\")\n", "  .drop(\"value\", \"maxRange\", \"minRange\")" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "0A84FF9B41F0495C93E8D9D1B6AD721E"
    },
    "cell_type" : "markdown",
    "source" : "## Write the Results to a Parquet File"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "967DA9CD00034B93A9F9077ACCABBF69"
    },
    "cell_type" : "code",
    "source" : [ "val query = knownSensors.writeStream\n", "  .outputMode(\"append\")\n", "  .format(\"parquet\")\n", "  .option(\"path\", targetPath)\n", "  .option(\"checkpointLocation\", workDir + \"/iot-checkpoint\")\n", "  .start()" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "presentation" : {
        "tabs_state" : "{\n  \"tab_id\": \"#tab636027600-0\"\n}",
        "pivot_chart_state" : "{\n  \"hiddenAttributes\": [],\n  \"menuLimit\": 200,\n  \"cols\": [],\n  \"rows\": [],\n  \"vals\": [],\n  \"exclusions\": {},\n  \"inclusions\": {},\n  \"unusedAttrsVertical\": 85,\n  \"autoSortUnusedAttrs\": false,\n  \"inclusionsInfo\": {},\n  \"aggregatorName\": \"Count\",\n  \"rendererName\": \"Table\"\n}"
      },
      "id" : "C60D42C6AA7C43108AF52A4EEA7B407E"
    },
    "cell_type" : "code",
    "source" : [ "\n", "query.recentProgress" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "A8292D947D7E4A889B416F5C2BB158C7"
    },
    "cell_type" : "code",
    "source" : [ "" ],
    "outputs" : [ ]
  } ],
  "nbformat" : 4
}